Python example using Spark SQL over Cloudant as a source

This sample notebook is written in Python and expects the Python 3.5 runtime. Make sure the kernel is started and you are connect to it when executing this notebook. The data source for this example can be found at: http://examples.cloudant.com/crimes/. Replicate the database into your own Cloudant account before you execute this script.

This Python notebook showcases how to use the SQL-Cloudant connector. This code reads Cloudant data, creates a DataFrame from the Cloudant data, filters that data down to only crime incidents with the nature code for a public disturbance, and then writes those 7 documents to another Cloudant database.

Watch the video

Once you import this notebook into Watson Studio, you will see an embedded video which walks you through the notebook.

1. Work with SparkSession

Import and initialize SparkSession.


In [ ]:
from pyspark.sql import SparkSession

In [ ]:
spark = SparkSession.builder.getOrCreate()

2. Work with a Cloudant database

A Dataframe object can be created directly from a Cloudant database. To configure the database as source, pass these options:

1 - package name that provides the classes (like CloudantDataSource) implemented in the connector to extend BaseRelation. For the SQL-Cloudant connector this will be org.apache.bahir.cloudant

2 - cloudant.host parameter to pass the Cloudant account name

3 - cloudant.user parameter to pass the Cloudant user name

4 - cloudant.password parameter to pass the Cloudant account password

5 - the database to load


In [ ]:
cloudantdata = spark.read.format("org.apache.bahir.cloudant")\
    .option("cloudant.host","xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-bluemix.cloudant.com")\
    .option("cloudant.username", "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-bluemix")\
    .option("cloudant.password","xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")\
    .load("crimes")

3. Work with a Dataframe

At this point, all transformations and functions should behave as specified with Spark SQL. (http://spark.apache.org/sql/)


In [ ]:
# This code prints the schema and a record count
cloudantdata.printSchema()
cloudantdata.count()

In [ ]:
# This code displays the values of the naturecode field
cloudantdata.select("properties.naturecode").show()

In [ ]:
# This code filters the data to just those records with a naturecode of "DISTRB", and then displays that data
disturbDF = cloudantdata.filter("properties.naturecode = 'DISTRB'")
disturbDF.show()

In [ ]:
# This code writes the filtered data to a Cloudant database called crimes_filtered. 
# To avoid error, the Cloudant database must already exist
disturbDF.select("properties").write.format("org.apache.bahir.cloudant")\
     .option("cloudant.host","xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-bluemix.cloudant.com")\
     .option("cloudant.username", "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx-bluemix")\
     .option("cloudant.password","xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx")\
     .option("createDBOnSave", "true")\
     .save("crimes_filtered")

In [ ]:
# Next, you'll see how to create a visualization of the crimes data. 
# First, this line creates a DataFrame containing all of the naturecodes and a count of the crime incidents for each code.
reducedValue = cloudantdata.groupBy("properties.naturecode").count()
reducedValue.printSchema()

4. Generate visualizations


In [ ]:
# This line imports two Python modules. The pprint module helps to produce pretty representations of data structures, 
# and the counter subclass from the collections module helps to count hashable objects.
import pprint
from collections import Counter

In [ ]:
# This line imports PySpark classes for Spark SQL and DataFrames.
from pyspark.sql import *
from pyspark.sql.functions import udf, asc, desc
from pyspark import SparkContext, SparkConf
from pyspark.sql.types import IntegerType

In [ ]:
# This line converts an Apache Spark DataFrame to a Panda DataFrame, and also sorts the DataFrame by count first, 
# and then by naturecode second in order to produce a sorted graph later.
import pandas as pd
pandaDF = reducedValue.orderBy(desc("count"), asc("naturecode")).toPandas()
print(pandaDF)

In [ ]:
# This is needed to actually see the plots
%matplotlib inline

# This line imports matplotlib.pyplot which is a collection of command style functions that make matplotlib work like MATLAB
import matplotlib.pyplot as plt

In [ ]:
# These lines assign the data to the values and labels objects.
values = pandaDF['count']
labels = pandaDF['naturecode']

# These lines provide the format for the plot.
plt.gcf().set_size_inches(16, 12, forward=True)
plt.title('Number of crimes by type')

# These lines specify that the plot should display as a horizontal bar chart with values being for the x axis 
# and labels for the y axis
plt.barh(range(len(values)), values)
plt.yticks(range(len(values)), labels)

# This last line displays the plot
plt.show()

In [ ]: